package org.tiny.autounit.core.data.transport.server.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.tiny.autounit.core.data.transport.server.domain.MessageType;
import org.tiny.autounit.core.data.transport.server.domain.NettyMessage;

/**
 * @author shichaoyang
 * @Description:
 * @date 2019-01-30 13:54
 */
public class HeartBeatResponseHandler extends ChannelInboundHandlerAdapter {

    private static final Logger logger = LoggerFactory.getLogger(HeartBeatResponseHandler.class);

    private ConcurrentMap<String, AtomicInteger> concurrentMap = new ConcurrentHashMap();

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage message = (NettyMessage) msg;
        //收到客户端心跳包，计数清零
        if (message != null && message.getType() == MessageType.HEARTBEAT.value()) {
            logger.error("收到客户端["+ctx.channel().remoteAddress().toString()+"]心跳 : ---> " + message);
            String channelId = ctx.channel().id().asLongText();
            concurrentMap.putIfAbsent(channelId, new AtomicInteger(1));
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        HeartBeatHelper.processIdleEvent(ctx
                , evt
                , () -> processReadIdle(ctx)
                , () -> processWriteIdle(ctx)
                , () -> processAllIdle(ctx)
        );
    }

    /**
     * 一段时间没收到客户端消息
     * @param ctx
     */
    private void processReadIdle(ChannelHandlerContext ctx){
        //尝试给客户端发一次心跳
        ctx.writeAndFlush(HeartBeatHelper.buildHeartBeat());
        logger.error("没有收到客户端["+ctx.channel().remoteAddress().toString()+"]心跳，尝试发送一条...");
        //同时进行计数，计数超过3次，认为客户端下线
        String channelId = ctx.channel().id().asLongText();
        AtomicInteger counter = concurrentMap.get(channelId);
        if (null == counter){
            counter = new AtomicInteger(1);
            concurrentMap.putIfAbsent(channelId, counter);
        }
        int times = counter.getAndIncrement();
        if (times > 3){
            logger.error("客户端["+ctx.channel().remoteAddress().toString()+"]已经掉线，断开连接...");
            ctx.close();
        }
    }

    /**
     * 一段时间没发送消息给客户端
     * @param ctx
     */
    private void processWriteIdle(ChannelHandlerContext ctx) {
        //尝试给客户端发一次心跳
        ctx.writeAndFlush(HeartBeatHelper.buildHeartBeat());
    }

    /**
     * 一段时间和客户端没任何交互，暂不处理
     * @param ctx
     */
    private void processAllIdle(ChannelHandlerContext ctx) {
    }

}
